import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import re
import json
import pickle
from time import time
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras import Input
captions=[]
with open("./Flickr_Data/Flickr_Data/Flickr_TextData/Flickr8k.token.txt") as f:
captions= f.read()
captions= captions.split('\n')[:-1]
print((captions[0]))
descriptions={}
for x in captions:
first , second = x.split('\t')
image_name= first.split('.')[0]
if descriptions.get(image_name) is None:
descriptions[image_name]=[]
descriptions[image_name].append(second)
print(descriptions['1000268201_693b08cb0e'])
Path='Flickr_Data/Flickr_Data/Images/'
import cv2
image= cv2.imread(Path+'1000268201_693b08cb0e.jpg')
image= cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
plt.axis('off')
plt.imshow(image)
def clean_data(sentence):
sentence=sentence.lower()
sentence= re.sub('[^a-z]+'," ",sentence)
print(sentence)
sentence=sentence.split()
sentence= [s for s in sentence if len(s)>1 ]
sentence= " ".join(sentence)
return sentence
sentence= clean_data(' A car has 2 headlights')
print((sentence))
for key , captions in descriptions.items():
for i in range(len(captions)):
captions[i]= clean_data(captions[i])
print(descriptions['1000268201_693b08cb0e'])
with open('descriptions_1.txt','w') as f:
f.write(str(descriptions))
with open('descriptions_1.txt') as f:
file_data= f.read()
json_acceptable_string= file_data.replace("'","\"")
descriptions= json.loads(json_acceptable_string)
print(descriptions['1000268201_693b08cb0e'])
vocab=set()
for key in descriptions.keys():
[vocab.update(sentence.split()) for sentence in descriptions[key]]
print(len(vocab))
total_words=[]
for key , captions in descriptions.items():
[total_words.append(s) for desc in captions for s in desc.split()]
print(len(total_words))
import collections
counter = collections.Counter(total_words)
freq_cnt= dict(counter)
print(len(freq_cnt.keys()))
sorted_freq_cnt= sorted(freq_cnt.items(),reverse=True,key= lambda x:x[1])
threshold= 10
freq_cnt_sorted=[x for x in sorted_freq_cnt if x[1]>threshold ]
total_words=[x[0] for x in freq_cnt_sorted]
print(len(total_words))
with open("Flickr_Data/Flickr_Data/Flickr_TextData/Flickr_8k.trainImages.txt") as f:
train_text_data=f.read()
with open("Flickr_Data/Flickr_Data/Flickr_TextData/Flickr_8k.testImages.txt") as f:
test_text_data=f.read()
train_image= [row.split('.')[0] for row in train_text_data.split('\n')[:-1]]
test_image= [row.split('.')[0] for row in test_text_data.split('\n')[:-1]]
train={}
for img_id in train_image:
train[img_id]=[]
for cap in descriptions[img_id]:
cap_to_append= "startseq "+cap+" endseq"
train[img_id].append(cap_to_append)
test={}
for img_id in test_image:
test[img_id]=[]
for cap in descriptions[img_id]:
cap_to_append= "startseq "+cap+" endseq"
test[img_id].append(cap_to_append)
from keras.layers import *
from keras.models import *
from keras.applications.resnet50 import ResNet50, preprocess_input
from keras.preprocessing import image
model = ResNet50( weights='imagenet',input_shape=(224,224,3))
model.summary()
model_new= Model(model.input, model.layers[-2].output)
def preprocess_image(img):
img= image.load_img(img, target_size=(224,224))
img= image.img_to_array(img)
img= np.expand_dims(img,axis=0)
#normalization
img= preprocess_input(img)
return img
img= preprocess_image(Path+'1000268201_693b08cb0e.jpg')
img= img[0]
plt.imshow(img)
def encode_image(img):
img= preprocess_image(img)
features_array=model_new.predict(img)
features_array= features_array.reshape((-1,))
#print(features_array.shape)
return features_array
encode_image(Path+'1000268201_693b08cb0e.jpg')
start= time()
encoded_vector={}
for ix, img_id in enumerate(train):
img_path= Path+img_id+".jpg"
encoded_vector[img_id]= encode_image(img_path)
if ix%100==0:
print('encoding in process time stamp %d'%ix)
end= time()
print("total time=",end-start)
!mkdir saved
with open("saved/encoded_train_images.pkl",'wb') as f:
pickle.dump(encoded_vector,f)
start= time()
to_encode_vector={}
for ix, img_id in enumerate(test):
img_path= Path+img_id+".jpg"
to_encode_vector[img_id]= encode_image(img_path)
if ix%100==0:
print('encoding in process time stamp %d'%ix)
end= time()
print("total test encode time=",end-start)
with open("saved/encoded_test_images.pkl",'wb') as f:
pickle.dump(to_encode_vector,f)
with open("saved/encoded_test_images.pkl",'wb') as f:
pickle.dump(to_encode_vector,f)
encoded_vector
max_len=0
for key in train.keys():
for cap in train[key]:
max_len=max(max_len, len(cap.split()))
print (max_len)
word_to_idx={}
idx_to_word={}
for i,x in enumerate(total_words):
word_to_idx[x]=i+1
idx_to_word[i+1]=x
print(len(word_to_idx))
idx_to_word[6]
idx_to_word[1846]='startseq'
word_to_idx['startseq']=1846
idx_to_word[1847]='endseq'
word_to_idx['endseq']=1847
idx_to_word[1847]
vocab_size=len(idx_to_word)+1
print(vocab_size)
def data_preparation(max_len, train, word_to_idx,vocab_size,encoded_vector, batch_size):
X1,X2,y=[],[],[]
n=0
while True:
for key , desc_list in train.items():
n+=1
image= encoded_vector[key]
for desc in desc_list:
seq= [word_to_idx[x] for x in desc.split() if word in word_to_idx]
for i in range(1, len(seq)):
xi= seq[0:i]
yi= seq[i]
xi= pad_sequences([xi],maxlen=max_len, value=0,padding='post')[0]
yi= to_caategorical([yi],num_classes=vocab_size)[0]
X1.append(image)
X2.append(xi)
y.append(yi)
if n==batch_size:
yield [[np.array(X1),np.array(X2)],np.array(y)]
X1,X2,y=[],[],[]
n=0
f= open('glove.6B.50d.txt', encoding='utf-8')
embedding_idx={}
for line in f:
values= line.split()
word= values[0]
embedding=np.array(values[1:], dtype='float')
embedding_idx[word]= embedding
embedding_idx['apple']
f.close()
embedding_idx['apple']
def get_embedding_matrix():
matrix= np.zeros((vocab_size,50))
for word, idx in word_to_idx.items():
embedding= embedding_idx.get(word)
if embedding is not None:
matrix[idx]= embedding
return matrix
embedding_matrix= get_embedding_matrix()
print(embedding_matrix.shape)
embedding_matrix[1]
input_img_features= Input(shape=(2048,))
input_img1= Dropout(0.3)(input_img_features)
input_img2= Dense(256,activation='relu')(input_img1)
input_captions= Input(shape=(max_len,))
inp_cap1= Embedding(input_dim=vocab_size, output_dim=50, mask_zero=True)(input_captions)
inp_cap2= Dropout(0.3)(inp_cap1)
inp_cap3= LSTM(256)(inp_cap2)
decoder1= add([input_img2,inp_cap3])
decoder2=Dense(256,activation='relu')(decoder1)
outputs=Dense(vocab_size,activation='softmax')(decoder2)
modelF= Model(inputs=[input_img_features,input_captions],outputs=outputs)
modelF.summary()
mkdir model_weights
modelF.layers[2].set_weights([embedding_matrix])
modelF.layers[2].trainable=False
modelF.compile(loss= 'categorical_crossentropy', optimizer='adam')
epochs=20
batch_size=3
steps= len(train)/batch_size
def train_data():
for i in range(epochs):
generator= data_preparation(max_len, train, word_to_idx,vocab_size,encoded_vector, batch_size)
modelF.fit_generator(generator,epochs=1,steps_per_epoch=steps,verbose=1)
modelF.save('./model_weights/model_'+str(i)+'.h5')
train_data()
model= load_model('./model_9.h5')
def predict_captions(photo):
in_text='startseq'
for i in range(max_len):
sequence=[word_to_idx[w] for w in in_text.split() if w in word_to_idx ]
sequence= pad_sequences([sequence], maxlen=max_len,padding='post')
ypred= model.predict([photo,sequence])
ypred= ypred.argmax()
word= idx_to_word[ypred]
in_text+=(' '+word)
if word=='endseq':
break
final_caption= in_text.split()[1:-1]
final_caption= " ".join(final_caption)
return final_caption
import cv2
plt.style.use('seaborn')
for i in range(15):
idx= np.random.randint(0,1000)
all_images_names= list(to_encode_vector.keys())
image_name=all_images_names[idx]
photo_2048= to_encode_vector[image_name].reshape((1,2048))
ima=cv2.imread(Path+image_name+'.jpg')
caption = predict_captions(photo_2048)
plt.title(caption)
plt.imshow(ima)
plt.axis('off')
plt.show()
print(len(train))